
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
import sys

reload(sys)
import xlrd
import os

df=sqlContext.read.orc("/user/hive/warehouse/dw.db/online_finance_without_extension_and_baidu/pt=201806")
df.registerTempTable("base")
aaa=sqlContext.sql("""create TABLE dw.test_20180715_1 AS SELECT  msisdn,
smslabel,
msg_content,
insert_time,
msg_report,
status ,
provider_id  ,
city ,
prov_id ,
area
FROM base WHERE unix_timestamp(insert_time) >= 1522512000""")


sys.setdefaultencoding('utf-8')
conf = SparkConf().set("spark.sql.parquet.compression.codec", "snappy")
sc = SparkContext("yarn-client", "test", conf=conf)
sqlContext = HiveContext(sc)
sqlContest = SQLContext(sc)
df=sqlContext.read.orc("/user/hive/warehouse/dw.db/online_finance_without_extension_and_baidu/pt=201806")
df.registerTempTable("base")
sql="""
	select
		t2.msisdn msisdn,
		max(t2.insert_time) insert_time
	from (
		select
			t.msisdn msisdn ,
			t.insert_time insert_time,
			t.smslabel smslabel
		from
			(SELECT msisdn,insert_time,smslabel
			   from base where unix_timestamp(insert_time) >=1514736000 and status =3 )  t
		LEFT OUTER JOIN
			(SELECT * FROM history.xiakuan UNION  ALL SELECT * FROM history.zhuce UNION  ALL SELECT * FROM history.licai) t1
		ON t.msisdn = t1.msisdn
		WHERE t1.msisdn is null
	) t2
	group by
		t2.msisdn
	limit 180000
"""
df=sqlContext.sql("select * FROM dw.temp_20180716_result")
writer = pd.ExcelWriter("/home/result/20180717/niwodai.xlsx",engine='xlsxwriter')
df.toPandas().to_excel(writer, sheet_name='Sheet1')
writer.save()
eachFile("2","20180625")
# load data local inpath '/home/data/20180716/111.txt' into table dw.temp_20180716history.zhuce partition(pt='20180715');


def eachFile(dir,pt):
    read_file_df = pd.read_excel("""/home/result/%(dir)s.xlsx""" %{"dir":dir})
    sqlContest = SQLContext(sc)
    spark_df = sqlContest.createDataFrame(read_file_df)
    spark_df.registerTempTable("tmp_table" )
    df = sqlContext.sql("""select
                cast(msisdn as string) msisdn,
                smslabel smslabel
            from
                tmp_table""")
    df.repartition(10).write.mode("append").orc("/user/hive/warehouse/dw.db/zhengquan_history/pt=%s" % pt)
    sqlContext.sql("alter table dw.zhengquan_history drop if exists partition (pt=%s)"% pt)
    sqlContext.sql("alter table dw.zhengquan_history add if not exists partition (pt=%s)"% pt)



# load data local inpath '/home/data/20180622/20180622.txt' into table history.meirong partition(pt='20180622');



df=sqlContext.read.orc("/user/hive/warehouse/dw.db/loan_success/pt=201806")
df.registerTempTable("base")
sql="""
	select
		t2.msisdn msisdn,
		max(t2.insert_time) insert_time,
		t2.smslabel
	from (
		select
			t.msisdn msisdn ,
			t.insert_time insert_time,
			t.smslabel smslabel
		SELECT msisdn,insert_time,smslabel
			   from base where unix_timestamp(insert_time) >=1525104000 and status =3
              and msg_content like '%验证码%' )  t
		LEFT OUTER JOIN
			(SELECT * FROM history.xiakuan UNION  ALL SELECT * FROM history.zhuce) t1
		ON t.msisdn = t1.msisdn
		WHERE t1.msisdn is null
	) t2
	group by
		t2.msisdn,t2.smslabel
	limit 100000
"""
df=sqlContext.sql("select * FROM dw.smslabel_count")
writer = pd.ExcelWriter("/home/result/smslabel.xlsx",engine='xlsxwriter')
df.toPandas().to_excel(writer, sheet_name='Sheet1')
writer.save()